Uthread: switching between threads

思路

xv6 已经实现了进程的切换机制,本实验要求参考进程的切换,实现一个用户态线程的切换。

要实现线程切换,必然涉及上下文,即寄存器的保存和恢复,那么需要保存哪些寄存器?实际上,只需要保存被调用者保存寄存器(callee-saved registers),而实现调用者保存寄存器(caller-saved registers)的保存与恢复的代码由编译器自动生成。关于调用者保存与被调用者保存寄存器有哪些可以参照下述 RISC-V 的 calling convention:

另外,根据 user/uthread_switch.S 的注释,thread_switch 最后通过 ret 指令将当前程序计数器的值切换为 ra 寄存器中存储的地址,实现进程的“切换”,因此 struct thread 中还需要保存每个线程对应程序的起始地址(即函数指针)。

在了解需要保存哪些寄存器之后以及如何进行线程切换之后,还有一个细节需要考虑,即栈指针寄存器(sp)的初始化。线程栈的存储位置为 struct thread 中的 stack 数组,那么 sp 应该指向 stack 的位置,但由于栈的地址从大到小增长,因此 sp 应该初始化为 (uint64)t->stack + STACK_SIZE.

代码

diff --git a/user/uthread.c b/user/uthread.c
index 06349f5..74b7f20 100644
--- a/user/uthread.c
+++ b/user/uthread.c
@@ -12,6 +12,20 @@
 
 
 struct thread {
+  /* 0 */  uint64 ra;
+  /* 8 */  uint64 sp;
+  /* 16 */  uint64 s0;
+  /* 24 */ uint64 s1;
+  /* 32 */ uint64 s2;
+  /* 40 */ uint64 s3;
+  /* 48 */ uint64 s4;
+  /* 56 */ uint64 s5;
+  /* 64 */ uint64 s6;
+  /* 72 */ uint64 s7;
+  /* 80 */ uint64 s8;
+  /* 88 */ uint64 s9;
+  /* 96 */ uint64 s10;
+  /* 104 */ uint64 s11;
   char       stack[STACK_SIZE]; /* the thread's stack */
   int        state;             /* FREE, RUNNING, RUNNABLE */
 };
@@ -62,6 +76,7 @@ thread_schedule(void)
      * Invoke thread_switch to switch from t to next_thread:
      * thread_switch(??, ??);
      */
+	thread_switch((uint64)t, (uint64)current_thread);
   } else
     next_thread = 0;
 }
@@ -76,6 +91,8 @@ thread_create(void (*func)())
   }
   t->state = RUNNABLE;
   // YOUR CODE HERE
+  t->ra = (uint64)func;
+  t->sp = (uint64)t->stack + STACK_SIZE;
 }
 
 void 
diff --git a/user/uthread_switch.S b/user/uthread_switch.S
index 5defb12..0eb0a2c 100644
--- a/user/uthread_switch.S
+++ b/user/uthread_switch.S
@@ -7,5 +7,34 @@
 
 	.globl thread_switch
 thread_switch:
	/* YOUR CODE HERE */
+	sd ra, 0(a0)
+	sd sp, 8(a0)
+	sd s0, 16(a0)
+	sd s1, 24(a0)
+	sd s2, 32(a0)
+	sd s3, 40(a0)
+	sd s4, 48(a0)
+	sd s5, 56(a0)
+	sd s6, 64(a0)
+	sd s7, 72(a0)
+	sd s8, 80(a0)
+	sd s9, 88(a0)
+	sd s10, 96(a0)
+	sd s11, 104(a0)
+
+	ld ra, 0(a1)
+	ld sp, 8(a1)
+	ld s0, 16(a1)
+	ld s1, 24(a1)
+	ld s2, 32(a1)
+	ld s3, 40(a1)
+	ld s4, 48(a1)
+	ld s5, 56(a1)
+	ld s6, 64(a1)
+	ld s7, 72(a1)
+	ld s8, 80(a1)
+	ld s9, 88(a1)
+	ld s10, 96(a1)
+	ld s11, 104(a1)
 	ret    /* return to ra */

Using threads

思路

后两个实验与 xv6 无关,而是练习使用 POSIX 线程库在实际的 Linux 平台进行并发编程。

本实验要求使用锁机制,实现一个支持并发的哈希表。首先需要确定的是:哪部分的操作会出现竞态(race condition)?根据观察不难得知 put() 操作可能存在下面这种情况:

线程 1 和线程 2 本次 put() 映射到一个桶中(i 相同),都执行完 line 46 ~ 49 的循环之后,e 都为 0,随后先后执行 insert(),都创建一个新的 entry,并先后更新 table[i] 的值,导致先插入的键被覆盖。
像这样,在一次插入操作未完成的情况下,另一次插入也开始进行且映射到一个桶中,就会导致丢键(keys missing)的情况发生。

首先最简单无脑的办法就是给整个 put() 函数加一把大锁:

pthread_mutex_lock(&lock);  // lock
for (e = table[i]; e != 0; e = e->next) {
    if (e->key == key)
        break;
}
if(e){
    // update the existing key.
    e->value = value;
} else {
    // the new is new.
    insert(key, value, &table[i], table[i]);
}
pthread_mutex_unlock(&lock);  // unlock

可以看到,keys missing 的问题已经被解决,但是大锁带来的就是更低的性能,实际上根据上图可知,该实现在双核情况下的运行速度甚至慢于单核。

实际上,对 table 数组的遍历并不会导致竞态,因此将加锁的操作延迟到遍历结束后:

for (e = table[i]; e != 0; e = e->next) {
    if (e->key == key)
        break;
}
pthread_mutex_lock(&lock);  // lock
if(e){
    // update the existing key.
    e->value = value;
} else {
    // the new is new.
    insert(key, value, &table[i], table[i]);
}
pthread_mutex_unlock(&lock);  // unlock

做了上述修改后,仍然没有出现 key missing 的情况,同时效率提升了一倍以上。

最后,更细化一些,只有当两个 put() 映射到同一个桶时才会发生竞态,因此可以为每个桶分别设置一把锁,以进一步提高并发性:

for (e = table[i]; e != 0; e = e->next) {
    if (e->key == key)
        break;
}
pthread_mutex_lock(&locks[i]);  // lock
if(e){
    // update the existing key.
    e->value = value;
} else {
    // the new is new.
    insert(key, value, &table[i], table[i]);
}
pthread_mutex_unlock(&locks[i]);  // unlock

可见,效率又有进一步提升。

代码

diff --git a/notxv6/ph.c b/notxv6/ph.c
index 82afe76..321e269 100644
--- a/notxv6/ph.c
+++ b/notxv6/ph.c
@@ -17,6 +17,7 @@ struct entry *table[NBUCKET];
 int keys[NKEYS];
 int nthread = 1;
 
+pthread_mutex_t locks[NBUCKET];
 
 double
 now()
@@ -47,6 +48,7 @@ void put(int key, int value)
     if (e->key == key)
       break;
   }
+  pthread_mutex_lock(&locks[i]);
   if(e){
     // update the existing key.
     e->value = value;
@@ -54,7 +56,7 @@ void put(int key, int value)
     // the new is new.
     insert(key, value, &table[i], table[i]);
   }
-
+  pthread_mutex_unlock(&locks[i]);
 }
 
 static struct entry*
@@ -118,6 +120,10 @@ main(int argc, char *argv[])
     keys[i] = random();
   }
 
+
+  for (int i = 0; i < NBUCKET; ++i) {
+    pthread_mutex_init(&locks[i], NULL);
+  }
   //
   // first the puts
   //

Barrier

思路

最后一个实验主要是熟悉 POSIX 线程库中条件变量(conditional variable)的使用,实现的思路比较简单:前 nthread - 1 个线程在条件变量上休眠,最后一个线程将休眠的所有进程进行唤醒。有关条件变量的用法可以参考 OSTEP:OSTEP: Condition Variables.

代码

diff --git a/notxv6/barrier.c b/notxv6/barrier.c
index 12793e8..e4fd03e 100644
--- a/notxv6/barrier.c
+++ b/notxv6/barrier.c
@@ -30,7 +30,18 @@ barrier()
   // Block until all threads have called barrier() and
   // then increment bstate.round.
   //  
+
+  pthread_mutex_lock(&bstate.barrier_mutex);
+  ++bstate.nthread;
+  if (bstate.nthread == nthread) {
+    pthread_cond_broadcast(&bstate.barrier_cond);
+	++bstate.round;
+	bstate.nthread = 0;
+  }
+  else {
+    pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
+  }
+  pthread_mutex_unlock(&bstate.barrier_mutex);
 }